#Tracy’s Github Repo - https://github.com/met-ad-688/assignment-03-tanyasiii
Load the dataset
import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id
np.random.seed(51 )
pio.renderers.default = "notebook"
# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData" ).getOrCreate()
# Load Data
df = spark.read.option("header" , "true" ).option("inferSchema" , "true" ).option("multiLine" ,"true" ).option("escape" , " \" " ).csv("data/lightcast_job_postings.csv" )
df.createOrReplaceTempView("job_postings" )
# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")
#df.printSchema() # comment this line when rendering the submission
#df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/24 00:47:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:> (0 + 1) / 1] 25/09/24 00:47:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
## Casting Salaries
df = df.withColumn("SALARY_FROM" , col("SALARY_FROM" ).cast("float" )) \
.withColumn("SALARY_TO" , col("SALARY_TO" ).cast("float" )) \
.withColumn("SALARY" , col("SALARY" ).cast("float" )) \
.withColumn("MIN_YEARS_EXPERIENCE" , col("MIN_YEARS_EXPERIENCE" ).cast("float" )) \
.withColumn("MAX_YEARS_EXPERIENCE" , col("MAX_YEARS_EXPERIENCE" ).cast("float" )) \
.withColumn("EDUCATION_LEVELS_NAME" ,regexp_replace(col("EDUCATION_LEVELS_NAME" ), r" [ \n\r ] " , "" )) \
## Computing Medians
def compute_median(sdf, col_name):
q = sdf.approxQuantile(col_name, [0.5 ], 0.01 )
return q[0 ] if q else None
median_from = compute_median(df, "SALARY_FROM" )
median_to = compute_median(df, "SALARY_TO" )
median_salary = compute_median(df, "SALARY" )
print ("Medians:" , median_from, median_to, median_salary)
## Imput missing using the medians
df = df.fillna({
"SALARY_FROM" : median_from,
"SALARY_TO" : median_to,
"SALARY" : median_salary
})
## compute average salary
df = df.withColumn("Average_Salary" , (col("SALARY_FROM" ) + col("SALARY_TO" ))/ 2 )
## removing null values in Employmet type column
df = df.na.drop(subset= ["EMPLOYMENT_TYPE_NAME" ])
# df.select("Average_Salary", "SALARY", "EDUCATION_LEVELS_NAME", "REMOTE_TYPE_NAME", "MAX_YEARS_EXPERIENCE", "LOT_V6_SPECIALIZED_OCCUPATION_NAME").show(5, truncate=False)
## selecting required columns & exporting data/ saving to csv
export_cols = [
"EDUCATION_LEVELS_NAME" ,
"REMOTE_TYPE_NAME" ,
"MAX_YEARS_EXPERIENCE" ,
"Average_Salary" ,
"SALARY_TO" ,
"SALARY_FROM" ,
"SALARY" ,
"LOT_V6_SPECIALIZED_OCCUPATION_NAME" ,
"LOT_OCCUPATION_NAME" ,
"NAICS2_NAME" ,
"EMPLOYMENT_TYPE_NAME" ,
"MIN_YEARS_EXPERIENCE"
]
df_selected = df.select(* export_cols)
## export
pdf = df_selected.toPandas()
pdf.to_csv("lightcast_cleaned.csv" , index= False )
#removing random characters from these columns
pdf["EMPLOYMENT_TYPE_NAME" ] = pdf["EMPLOYMENT_TYPE_NAME" ].astype(str ).apply (
lambda x: re.sub(r" [^ \x00 - \x7F ] + " , "" , x)
)
pdf["EDUCATION_LEVELS_NAME" ] = pdf["EDUCATION_LEVELS_NAME" ].astype(str ).str .replace(r" [ \n\r\\\"\[\] ] " , "" , regex= True )
print (pdf.columns.tolist())
print ("Data cleaning complete. Row retained:" , len (pdf))
[Stage 2:> (0 + 1) / 1] [Stage 3:> (0 + 1) / 1] [Stage 4:> (0 + 1) / 1]
Medians: 87295.0 130042.0 115024.0
['EDUCATION_LEVELS_NAME', 'REMOTE_TYPE_NAME', 'MAX_YEARS_EXPERIENCE', 'Average_Salary', 'SALARY_TO', 'SALARY_FROM', 'SALARY', 'LOT_V6_SPECIALIZED_OCCUPATION_NAME', 'LOT_OCCUPATION_NAME', 'NAICS2_NAME', 'EMPLOYMENT_TYPE_NAME', 'MIN_YEARS_EXPERIENCE']
Data cleaning complete. Row retained: 72454
#Question 1a - Salary Distribution by Industry
fig = px.box(
pdf,
x= "NAICS2_NAME" ,
y= "SALARY" ,
title= "Salary Distribution by Industry" ,
color_discrete_sequence= ["purple" ],
points= "outliers" ,
)
fig.update_layout(
font_family= "Times New Roman" ,
title_font_size= 16 ,
xaxis_title= "Industry" ,
yaxis_title= "Salary" ,
xaxis_tickangle= 45 ,
)
fig.show()
fig.write_html("Q1a.html" )
#fig.write_image("Q1a.png")
#Analysis:
#Question 1b - Salary Distribution by Employment Type
fig = px.box(
pdf,
x= "EMPLOYMENT_TYPE_NAME" ,
y= "SALARY" ,
title= "Salary Distribution by Employment Type" ,
color_discrete_sequence= ["orange" ],
points= "outliers" ,
)
fig.update_layout(
font_family= "Times New Roman" ,
title_font_size= 16 ,
xaxis_title= "Employment Type" ,
yaxis_title= "Salary" ,
xaxis_tickangle= 45 ,
)
fig.show()
fig.write_html("Q1b.html" )
#fig.write_image("Q1b.png")
#Analysis:
#Question 2 - Salary Analysis by ONET Occupation Type
saonet = spark.sql("""
SELECT
LOT_OCCUPATION_NAME AS Occupation_Name,
PERCENTILE(SALARY, 0.5) AS Median_Salary,
COUNT(*) AS Job_Postings
FROM job_postings
GROUP BY LOT_OCCUPATION_NAME
ORDER BY Job_Postings DESC
LIMIT 10
"""
)
saonet_pd = saonet.toPandas()
saonet_pd.head()
fig = px.scatter(
saonet_pd,
x= "Occupation_Name" ,
y= "Median_Salary" ,
size= "Job_Postings" ,
title= "Salary Analysis by LOT Occupation Type (Bubble Chart)" ,
labels= {
"LOT_OCCUPATION_NAME" : "LOT Occupation" ,
"Median_Salary" : "Median Salary" ,
"Job_Postings" : "Number of Job Postings"
},
hover_name= "Occupation_Name" ,
size_max= 60 ,
width= 1000 ,
height= 600 ,
color= "Job_Postings" ,
color_continuous_scale= "Plasma"
)
fig.update_layout(
font_family= "Times New Roman" ,
font_size= 16 ,
title_font_size= 20 ,
xaxis_title= "LOT Occupation" ,
yaxis_title= "Median Salary" ,
xaxis= dict (
tickangle=- 45 ,
showline= True ,
linecolor= "black"
),
yaxis= dict (
showline= True ,
linecolor= "black"
)
)
fig.show()
fig.write_html("Q2a.html" )
#Analysis:
#Question 3 - Salary by Educational Level
df = df.withColumn(
"EDU_GROUP" ,
when(col("EDUCATION_LEVELS_NAME" ).rlike("(?i)Bachelor'?s?|Associate|GED|Highschool|No Education" ), "Lower Degrees" )
.when(col("EDUCATION_LEVELS_NAME" ).rlike("(?i)Master'?s?|PhD|Doctorate|professional" ), "Higher Degrees" )
.otherwise("Other" )
)
df = df.withColumn("MAX_YEARS_EXPERIENCE" , col("MAX_YEARS_EXPERIENCE" ).cast("float" ))
df = df.withColumn("Average_Salary" , col("Average_Salary" ).cast("float" ))
df = df.filter (
col("MAX_YEARS_EXPERIENCE" ).isNotNull() &
(col("MAX_YEARS_EXPERIENCE" )> 0 ) &
col("Average_Salary" ).isNotNull() &
(col("Average_Salary" )> 0 )
)
df_filtered = df.filter (col("EDU_GROUP" ).isin(["Lower Degrees" , "Higher Degrees" ]))
df_pd = df_filtered.toPandas()
# Add jitter to experience for readability
np.random.seed(51 )
df_pd["Experience_Jitter" ] = df_pd["MAX_YEARS_EXPERIENCE" ] + np.random.uniform(- 0.3 , 0.3 , size= len (df_pd))
fig = px.scatter(
df_pd,
x= "Experience_Jitter" ,
y= "Average_Salary" ,
color= "EDU_GROUP" ,
hover_data= ["LOT_V6_SPECIALIZED_OCCUPATION_NAME" ],
title= "Experience VS Salary by Education Level" ,
opacity = 0.7 ,
color_discrete_sequence= ["green" , "red" ]
)
fig.update_traces(marker= dict (size= 7 , line= dict (width= 1 , color= "black" )))
fig.update_layout(
font_family= "Times New Roman" ,
title_font= dict (size= 20 ),
font= dict (size= 16 ),
xaxis_title = "Years of Experience" ,
yaxis_title = "Average Salary($)" ,
legend_title = "Education Groups" ,
xaxis= dict (
gridcolor= "lightblue" ,
tickmode= "linear" ,
dtick= 1
),
yaxis= dict (
gridcolor = "lightblue"
)
)
fig.show()
fig.write_html("Q3.html" )
#Analysis:
#Question 4 - Salary by Remote Work type
df = df.withColumn(
"REMOTE_GROUP" ,
when(col("REMOTE_TYPE_NAME" ).rlike("(?i)^Remote$" ), "Remote" )
.when(col("REMOTE_TYPE_NAME" ).rlike("(?i)^Hybrid Remote$" ), "Hybrid" )
.when(col("REMOTE_TYPE_NAME" ).isNull() | col("REMOTE_TYPE_NAME" ).rlike("(?i)^Not Remote$" ), "Onsite" )
.otherwise("Other" )
)
# --- Step 2: Keep numeric columns as float & filter valid rows ---
df = df.withColumn("MAX_YEARS_EXPERIENCE" , col("MAX_YEARS_EXPERIENCE" ).cast("float" ))
df = df.withColumn("Average_Salary" , col("Average_Salary" ).cast("float" ))
df = df.filter (
col("MAX_YEARS_EXPERIENCE" ).isNotNull() &
(col("MAX_YEARS_EXPERIENCE" )> 0 ) &
col("Average_Salary" ).isNotNull() &
(col("Average_Salary" )> 0 )
)
# --- Step 3: Filter only the main three remote types ---
df_filtered = df.filter (col("REMOTE_GROUP" ).isin(["Remote" , "Hybrid" , "Onsite" ]))
# --- Step 4: Convert to Pandas ---
df_pd = df_filtered.toPandas()
fig = px.scatter(
df_pd,
x= "MAX_YEARS_EXPERIENCE" ,
y= "Average_Salary" ,
color= "REMOTE_GROUP" ,
hover_data= ["LOT_V6_SPECIALIZED_OCCUPATION_NAME" ],
title= "Experience vs Salary by Remote Work Type" ,
opacity= 0.7 ,
color_discrete_sequence= ["yellow" , "magenta" , "blue" ]
)
fig.update_traces(marker= dict (size= 7 , line= dict (width= 1 , color= "black" )))
fig.update_layout(
font_family= "Times New Roman" ,
title_font= dict (size= 20 ),
font= dict (size= 16 ),
xaxis_title= "Years of Experience" ,
yaxis_title= "Average Salary ($)" ,
legend_title= "Remote Work Type" ,
xaxis= dict (gridcolor= "lightblue" , tickmode= "linear" , dtick= 1 ),
yaxis= dict (gridcolor= "lightblue" )
)
fig.show()
fig.write_html("Q4.html" )
#Analysis: